<!DOCTYPE html>
<html lang="en-US">
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width,initial-scale=1">
    <title>JavaKeeper</title>
    <meta name="generator" content="VuePress 1.5.4">
    <link rel="icon" href="/icon.svg">
    <script>
        var _hmt = _hmt || [];
        (function() {
            var hm = document.createElement("script");
            hm.src = "https://hm.baidu.com/hm.js?a949a9b30eb86ac0159e735ff8670c03";
            var s = document.getElementsByTagName("script")[0];
            s.parentNode.insertBefore(hm, s);
            // 引入谷歌,不需要可删除这段
            var hm1 = document.createElement("script");
            hm1.src = "https://www.googletagmanager.com/gtag/js?id=UA-169923503-1";
            var s1 = document.getElementsByTagName("script")[0]; 
            s1.parentNode.insertBefore(hm1, s1);
        })();
        // 谷歌加载,不需要可删除
        window.dataLayer = window.dataLayer || [];
        function gtag(){dataLayer.push(arguments);}
        gtag('js', new Date());
        gtag('config', 'UA-169923503-1');
    </script>
    <meta name="description" content="">
    <meta name="keywords" content="JavaKeeper,Java,Java开发,算法,blog">
    <link rel="preload" href="/assets/css/0.styles.91f57736.css" as="style"><link rel="preload" href="/assets/js/app.447d4224.js" as="script"><link rel="preload" href="/assets/js/3.9d76740c.js" as="script"><link rel="preload" href="/assets/js/1.c4fd7d2e.js" as="script"><link rel="preload" href="/assets/js/152.369c9362.js" as="script"><link rel="prefetch" href="/assets/js/10.8cf3be2c.js"><link rel="prefetch" href="/assets/js/100.74f35ab8.js"><link rel="prefetch" href="/assets/js/101.7a062346.js"><link rel="prefetch" href="/assets/js/102.c9485235.js"><link rel="prefetch" href="/assets/js/103.d88a3805.js"><link rel="prefetch" href="/assets/js/104.6e034144.js"><link rel="prefetch" href="/assets/js/105.d22f7450.js"><link rel="prefetch" href="/assets/js/106.a6cb54b0.js"><link rel="prefetch" href="/assets/js/107.7b65e72b.js"><link rel="prefetch" href="/assets/js/108.eb5804bb.js"><link rel="prefetch" href="/assets/js/109.05f775e5.js"><link rel="prefetch" href="/assets/js/11.c54ae13c.js"><link rel="prefetch" href="/assets/js/110.51d3d641.js"><link rel="prefetch" href="/assets/js/111.022b64a7.js"><link rel="prefetch" href="/assets/js/112.da8afd52.js"><link rel="prefetch" href="/assets/js/113.05a17b18.js"><link rel="prefetch" href="/assets/js/114.8960d913.js"><link rel="prefetch" href="/assets/js/115.67919f68.js"><link rel="prefetch" href="/assets/js/116.62b0cd71.js"><link rel="prefetch" href="/assets/js/117.ebac3eff.js"><link rel="prefetch" href="/assets/js/118.ecd629bd.js"><link rel="prefetch" href="/assets/js/119.a09a0897.js"><link rel="prefetch" href="/assets/js/12.60aa3b24.js"><link rel="prefetch" href="/assets/js/120.bf639d3d.js"><link rel="prefetch" href="/assets/js/121.b89d0c8e.js"><link rel="prefetch" href="/assets/js/122.1a75ff83.js"><link rel="prefetch" href="/assets/js/123.d2127132.js"><link rel="prefetch" href="/assets/js/124.2caff9e0.js"><link rel="prefetch" href="/assets/js/125.9b9f966a.js"><link rel="prefetch" href="/assets/js/126.58cdfb3d.js"><link rel="prefetch" href="/assets/js/127.8ef09c53.js"><link rel="prefetch" href="/assets/js/128.efdc2ae4.js"><link rel="prefetch" href="/assets/js/129.e35cbc57.js"><link rel="prefetch" href="/assets/js/13.125c13a0.js"><link rel="prefetch" href="/assets/js/130.f01a55e3.js"><link rel="prefetch" href="/assets/js/131.65205f4a.js"><link rel="prefetch" href="/assets/js/132.f42c5a0a.js"><link rel="prefetch" href="/assets/js/133.9ba468b3.js"><link rel="prefetch" href="/assets/js/134.7b597ba9.js"><link rel="prefetch" href="/assets/js/135.fb828b9a.js"><link rel="prefetch" href="/assets/js/136.3887532f.js"><link rel="prefetch" href="/assets/js/137.549bae01.js"><link rel="prefetch" href="/assets/js/138.db8d423d.js"><link rel="prefetch" href="/assets/js/139.dbaf2267.js"><link rel="prefetch" href="/assets/js/14.bd1d0b0d.js"><link rel="prefetch" href="/assets/js/140.6cb65fdc.js"><link rel="prefetch" href="/assets/js/141.9bd6cc4b.js"><link rel="prefetch" href="/assets/js/142.552db5ed.js"><link rel="prefetch" href="/assets/js/143.2c9f2bf4.js"><link rel="prefetch" href="/assets/js/144.fba98a15.js"><link rel="prefetch" href="/assets/js/145.c42f3a21.js"><link rel="prefetch" href="/assets/js/146.596d4d33.js"><link rel="prefetch" href="/assets/js/147.c48ae5c1.js"><link rel="prefetch" href="/assets/js/148.71064871.js"><link rel="prefetch" href="/assets/js/149.16582d21.js"><link rel="prefetch" href="/assets/js/15.f247873b.js"><link rel="prefetch" href="/assets/js/150.ead09aca.js"><link rel="prefetch" href="/assets/js/151.971fdf4b.js"><link rel="prefetch" href="/assets/js/153.371edd15.js"><link rel="prefetch" href="/assets/js/154.e090b491.js"><link rel="prefetch" href="/assets/js/155.c68bf602.js"><link rel="prefetch" href="/assets/js/156.304aea8d.js"><link rel="prefetch" href="/assets/js/157.83beef7f.js"><link rel="prefetch" href="/assets/js/158.bb1794b0.js"><link rel="prefetch" href="/assets/js/159.2d54e792.js"><link rel="prefetch" href="/assets/js/16.04336c71.js"><link rel="prefetch" href="/assets/js/160.99d56586.js"><link rel="prefetch" href="/assets/js/161.edf660aa.js"><link rel="prefetch" href="/assets/js/162.0b84606e.js"><link rel="prefetch" href="/assets/js/163.b59e0d60.js"><link rel="prefetch" href="/assets/js/164.d9eb8228.js"><link rel="prefetch" href="/assets/js/165.ca624c79.js"><link rel="prefetch" href="/assets/js/166.025b2ba1.js"><link rel="prefetch" href="/assets/js/167.abc982cc.js"><link rel="prefetch" href="/assets/js/168.27ca13dc.js"><link rel="prefetch" href="/assets/js/169.41e753a2.js"><link rel="prefetch" href="/assets/js/17.43b3c1c8.js"><link rel="prefetch" href="/assets/js/170.626319e1.js"><link rel="prefetch" href="/assets/js/171.a221dddf.js"><link rel="prefetch" href="/assets/js/172.464b2361.js"><link rel="prefetch" href="/assets/js/173.96a3afee.js"><link rel="prefetch" href="/assets/js/174.116607c2.js"><link rel="prefetch" href="/assets/js/175.ea3e8659.js"><link rel="prefetch" href="/assets/js/176.7d7b8afc.js"><link rel="prefetch" href="/assets/js/177.a6e00aa0.js"><link rel="prefetch" href="/assets/js/178.1f93afaf.js"><link rel="prefetch" href="/assets/js/179.3aa00dcd.js"><link rel="prefetch" href="/assets/js/18.d81b44d5.js"><link rel="prefetch" href="/assets/js/180.f8b2b75a.js"><link rel="prefetch" href="/assets/js/181.8e11258a.js"><link rel="prefetch" href="/assets/js/182.22243941.js"><link rel="prefetch" href="/assets/js/183.d051fdf6.js"><link rel="prefetch" href="/assets/js/184.a994075e.js"><link rel="prefetch" href="/assets/js/185.776c7e16.js"><link rel="prefetch" href="/assets/js/186.f1887955.js"><link rel="prefetch" href="/assets/js/187.da0d3626.js"><link rel="prefetch" href="/assets/js/188.8dfc358f.js"><link rel="prefetch" href="/assets/js/189.dcac5a59.js"><link rel="prefetch" href="/assets/js/19.1b3d66e1.js"><link rel="prefetch" href="/assets/js/190.c7e413d0.js"><link rel="prefetch" href="/assets/js/191.d9806121.js"><link rel="prefetch" href="/assets/js/192.869791da.js"><link rel="prefetch" href="/assets/js/193.2d74c4c8.js"><link rel="prefetch" href="/assets/js/194.c73a1909.js"><link rel="prefetch" href="/assets/js/195.e8c74834.js"><link rel="prefetch" href="/assets/js/20.bd5949ec.js"><link rel="prefetch" href="/assets/js/21.3fcf98cf.js"><link rel="prefetch" href="/assets/js/22.2fa1e2e8.js"><link rel="prefetch" href="/assets/js/23.1ae64bb4.js"><link rel="prefetch" href="/assets/js/24.7bdf7387.js"><link rel="prefetch" href="/assets/js/25.392c436e.js"><link rel="prefetch" href="/assets/js/26.58acbd4b.js"><link rel="prefetch" href="/assets/js/27.c725bdd5.js"><link rel="prefetch" href="/assets/js/28.6c9bda1e.js"><link rel="prefetch" href="/assets/js/29.e656b537.js"><link rel="prefetch" href="/assets/js/30.2c326fc7.js"><link rel="prefetch" href="/assets/js/31.e6c9fa30.js"><link rel="prefetch" href="/assets/js/32.c9c88437.js"><link rel="prefetch" href="/assets/js/33.0c53373c.js"><link rel="prefetch" href="/assets/js/34.9821e543.js"><link rel="prefetch" href="/assets/js/35.de8253eb.js"><link rel="prefetch" href="/assets/js/36.d182f929.js"><link rel="prefetch" href="/assets/js/37.9fa79014.js"><link rel="prefetch" href="/assets/js/38.9bebff76.js"><link rel="prefetch" href="/assets/js/39.19a3a2d4.js"><link rel="prefetch" href="/assets/js/4.564edb9d.js"><link rel="prefetch" href="/assets/js/40.cca6955f.js"><link rel="prefetch" href="/assets/js/41.854cd09a.js"><link rel="prefetch" href="/assets/js/42.ca7b612f.js"><link rel="prefetch" href="/assets/js/43.87027d58.js"><link rel="prefetch" href="/assets/js/44.8c2b4f4b.js"><link rel="prefetch" href="/assets/js/45.dffb4e08.js"><link rel="prefetch" href="/assets/js/46.f58049a5.js"><link rel="prefetch" href="/assets/js/47.6854070c.js"><link rel="prefetch" href="/assets/js/48.6cd9fa3d.js"><link rel="prefetch" href="/assets/js/49.e8861afa.js"><link rel="prefetch" href="/assets/js/5.5c31d62f.js"><link rel="prefetch" href="/assets/js/50.703bffab.js"><link rel="prefetch" href="/assets/js/51.6655c373.js"><link rel="prefetch" href="/assets/js/52.deb2eb09.js"><link rel="prefetch" href="/assets/js/53.6e0ed77d.js"><link rel="prefetch" href="/assets/js/54.b05c58ad.js"><link rel="prefetch" href="/assets/js/55.49c8164e.js"><link rel="prefetch" href="/assets/js/56.a5574e6b.js"><link rel="prefetch" href="/assets/js/57.58cb0de4.js"><link rel="prefetch" href="/assets/js/58.52345112.js"><link rel="prefetch" href="/assets/js/59.663ce78d.js"><link rel="prefetch" href="/assets/js/6.a9df34ee.js"><link rel="prefetch" href="/assets/js/60.f06adde2.js"><link rel="prefetch" href="/assets/js/61.170255a1.js"><link rel="prefetch" href="/assets/js/62.9d120050.js"><link rel="prefetch" href="/assets/js/63.70cced6b.js"><link rel="prefetch" href="/assets/js/64.577f3548.js"><link rel="prefetch" href="/assets/js/65.c037b29d.js"><link rel="prefetch" href="/assets/js/66.7dd1045f.js"><link rel="prefetch" href="/assets/js/67.d3aa4d6c.js"><link rel="prefetch" href="/assets/js/68.526dbb61.js"><link rel="prefetch" href="/assets/js/69.58269266.js"><link rel="prefetch" href="/assets/js/7.6609d4d6.js"><link rel="prefetch" href="/assets/js/70.64108f1b.js"><link rel="prefetch" href="/assets/js/71.1e95e0a6.js"><link rel="prefetch" href="/assets/js/72.42e7ec94.js"><link rel="prefetch" href="/assets/js/73.dad4e1c5.js"><link rel="prefetch" href="/assets/js/74.28ea286a.js"><link rel="prefetch" href="/assets/js/75.dd6d4c6f.js"><link rel="prefetch" href="/assets/js/76.ca6539df.js"><link rel="prefetch" href="/assets/js/77.feb13b0e.js"><link rel="prefetch" href="/assets/js/78.321e90e6.js"><link rel="prefetch" href="/assets/js/79.68eb8fcf.js"><link rel="prefetch" href="/assets/js/8.396d51fd.js"><link rel="prefetch" href="/assets/js/80.4edb5321.js"><link rel="prefetch" href="/assets/js/81.735d7e57.js"><link rel="prefetch" href="/assets/js/82.fa120bdf.js"><link rel="prefetch" href="/assets/js/83.bf755f94.js"><link rel="prefetch" href="/assets/js/84.9b32070c.js"><link rel="prefetch" href="/assets/js/85.592aca7c.js"><link rel="prefetch" href="/assets/js/86.4dcd9e73.js"><link rel="prefetch" href="/assets/js/87.a9e546aa.js"><link rel="prefetch" href="/assets/js/88.2a423212.js"><link rel="prefetch" href="/assets/js/89.5f455115.js"><link rel="prefetch" href="/assets/js/9.adb074c6.js"><link rel="prefetch" href="/assets/js/90.5202da0a.js"><link rel="prefetch" href="/assets/js/91.02cee99d.js"><link rel="prefetch" href="/assets/js/92.f16bad0b.js"><link rel="prefetch" href="/assets/js/93.f933634f.js"><link rel="prefetch" href="/assets/js/94.8e7b1d65.js"><link rel="prefetch" href="/assets/js/95.ee0e4a0a.js"><link rel="prefetch" href="/assets/js/96.e21d78c2.js"><link rel="prefetch" href="/assets/js/97.c87e514e.js"><link rel="prefetch" href="/assets/js/98.d123ac92.js"><link rel="prefetch" href="/assets/js/99.92d1b416.js">
    <link rel="stylesheet" href="/assets/css/0.styles.91f57736.css">
  </head>
  <body>
    <div id="app" data-server-rendered="true"><div class="theme-container no-sidebar" data-v-3ba18f14><div data-v-3ba18f14><div id="loader-wrapper" class="loading-wrapper" data-v-041fef5b data-v-3ba18f14 data-v-3ba18f14><div class="loader-main" data-v-041fef5b><div data-v-041fef5b></div><div data-v-041fef5b></div><div data-v-041fef5b></div><div data-v-041fef5b></div></div> <!----> <!----></div> <div class="password-shadow password-wrapper-out" style="display:none;" data-v-68139a52 data-v-3ba18f14 data-v-3ba18f14><h3 class="title" style="display:none;" data-v-68139a52 data-v-68139a52>JavaKeeper</h3> <!----> <label id="box" class="inputBox" style="display:none;" data-v-68139a52 data-v-68139a52><input type="password" value="" data-v-68139a52> <span data-v-68139a52>Konck! Knock!</span> <button data-v-68139a52>OK</button></label> <div class="footer" style="display:none;" data-v-68139a52 data-v-68139a52><span data-v-68139a52><i class="iconfont reco-theme" data-v-68139a52></i> <a target="blank" href="https://vuepress-theme-reco.recoluan.com" data-v-68139a52>vuePress-theme-reco</a></span> <span data-v-68139a52><i class="iconfont reco-copyright" data-v-68139a52></i> <a data-v-68139a52><span data-v-68139a52>海星</span>
            
          <!---->
          2020
        </a></span></div></div> <div class="hide" data-v-3ba18f14><header class="navbar" data-v-3ba18f14><div class="sidebar-button"><svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" role="img" viewBox="0 0 448 512" class="icon"><path fill="currentColor" d="M436 124H12c-6.627 0-12-5.373-12-12V80c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12z"></path></svg></div> <a href="/" class="home-link router-link-active"><!----> <span class="site-name">JavaKeeper</span></a> <div class="links"><div class="color-picker"><a class="color-button"><i class="iconfont reco-color"></i></a> <div class="color-picker-menu" style="display:none;"><div class="mode-options"><h4 class="title">Choose mode</h4> <ul class="color-mode-options"><li class="dark">dark</li><li class="auto active">auto</li><li class="light">light</li></ul></div></div></div> <div class="search-box"><i class="iconfont reco-search"></i> <input aria-label="Search" autocomplete="off" spellcheck="false" value=""> <!----></div> <nav class="nav-links can-hide"><div class="nav-item"><a href="/java/" class="nav-link"><i class="iconfont undefined"></i>
  Java
</a></div><div class="nav-item"><a href="/data-structure-algorithms/" class="nav-link"><i class="iconfont undefined"></i>
  数据结构与算法
</a></div><div class="nav-item"><a href="/data-store/" class="nav-link"><i class="iconfont undefined"></i>
  数据存储与缓存
</a></div><div class="nav-item"><a href="/interview/" class="nav-link"><i class="iconfont undefined"></i>
  直击面试
</a></div> <a href="https://github.com/Jstarfish/JavaKeeper" target="_blank" rel="noopener noreferrer" class="repo-link"><i class="iconfont reco-github"></i>
    GitHub
    <svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></nav></div></header> <div class="sidebar-mask" data-v-3ba18f14></div> <aside class="sidebar" data-v-3ba18f14><div class="personal-info-wrapper" data-v-5f6acefd data-v-3ba18f14><!----> <h3 class="name" data-v-5f6acefd>
    海星
  </h3> <div class="num" data-v-5f6acefd><div data-v-5f6acefd><h3 data-v-5f6acefd>0</h3> <h6 data-v-5f6acefd>Article</h6></div> <div data-v-5f6acefd><h3 data-v-5f6acefd>0</h3> <h6 data-v-5f6acefd>Tag</h6></div></div> <hr data-v-5f6acefd></div> <nav class="nav-links"><div class="nav-item"><a href="/java/" class="nav-link"><i class="iconfont undefined"></i>
  Java
</a></div><div class="nav-item"><a href="/data-structure-algorithms/" class="nav-link"><i class="iconfont undefined"></i>
  数据结构与算法
</a></div><div class="nav-item"><a href="/data-store/" class="nav-link"><i class="iconfont undefined"></i>
  数据存储与缓存
</a></div><div class="nav-item"><a href="/interview/" class="nav-link"><i class="iconfont undefined"></i>
  直击面试
</a></div> <a href="https://github.com/Jstarfish/JavaKeeper" target="_blank" rel="noopener noreferrer" class="repo-link"><i class="iconfont reco-github"></i>
    GitHub
    <svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></nav> <!----> </aside> <div class="password-shadow password-wrapper-in" style="display:none;" data-v-68139a52 data-v-3ba18f14><h3 class="title" style="display:none;" data-v-68139a52 data-v-68139a52></h3> <!----> <label id="box" class="inputBox" style="display:none;" data-v-68139a52 data-v-68139a52><input type="password" value="" data-v-68139a52> <span data-v-68139a52>Konck! Knock!</span> <button data-v-68139a52>OK</button></label> <div class="footer" style="display:none;" data-v-68139a52 data-v-68139a52><span data-v-68139a52><i class="iconfont reco-theme" data-v-68139a52></i> <a target="blank" href="https://vuepress-theme-reco.recoluan.com" data-v-68139a52>vuePress-theme-reco</a></span> <span data-v-68139a52><i class="iconfont reco-copyright" data-v-68139a52></i> <a data-v-68139a52><span data-v-68139a52>海星</span>
            
          <!---->
          2020
        </a></span></div></div> <div data-v-3ba18f14><main class="page"><div class="page-title" style="display:none;"><h1 class="title"></h1> <div data-v-5d8dbdb4><i class="iconfont reco-account" data-v-5d8dbdb4><span data-v-5d8dbdb4>海星</span></i> <!----> <!----> <!----></div></div> <div class="theme-reco-content content__default" style="display:none;"><blockquote><p>https://www.cnblogs.com/huxi2b/p/6223228.html</p></blockquote> <h2 id="消费者组"><a href="#消费者组" class="header-anchor">#</a> 消费者组</h2> <p><strong>1 什么是消费者组</strong></p> <p>其实对于这些基本概念的普及，网上资料实在太多了。我本不应该再画蛇添足了，但为了本文的完整性，我还是要花一些篇幅来重谈consumer group，至少可以说说我的理解。值得一提的是，由于我们今天基本上只探讨consumer group，对于单独的消费者不做过多讨论。</p> <p>什么是consumer group? 一言以蔽之，consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组，那么组内必然可以有多个消费者或消费者实例(consumer instance)，它们共享一个公共的ID，即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然，每个分区只能由同一个消费组内的一个consumer来消费。（网上文章中说到此处各种炫目多彩的图就会紧跟着抛出来，我这里就不画了，请原谅）。个人认为，理解consumer group记住下面这三个特性就好了：</p> <ul><li>consumer group下可以有一个或多个consumer instance，consumer instance可以是一个进程，也可以是一个线程</li> <li>group.id是一个字符串，唯一标识一个consumer group</li> <li>consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)</li></ul> <p><strong>2 消费者位置(consumer position)</strong></p> <p>消费者在消费的过程中需要记录自己消费了多少数据，即消费位置信息。在Kafka中这个位置信息有个专门的术语：位移(offset)。很多消息引擎都把这部分信息保存在服务器端(broker端)。这样做的好处当然是实现简单，但会有三个主要的问题：1. broker从此变成有状态的，会影响伸缩性；2. 需要引入应答机制(acknowledgement)来确认消费成功。3. 由于要保存很多consumer的offset信息，必然引入复杂的数据结构，造成资源浪费。而Kafka选择了不同的方式：每个consumer group保存自己的位移信息，那么只需要简单的一个整数表示位置就够了；同时可以引入checkpoint机制定期持久化，简化了应答机制的实现。</p> <p><strong>3 位移管理(offset management)</strong></p> <p><strong>3.1 自动VS手动</strong></p> <p>Kafka默认是定期帮你自动提交位移的(enable.auto.commit = true)，你当然可以选择手动提交位移实现自己控制。另外kafka会定期把group消费情况保存起来，做成一个offset map，如下图所示：</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161226175429711-638862783.png" alt="img"></p> <p>上图中表明了test-group这个组当前的消费情况。</p> <p><strong>3.2 位移提交</strong></p> <p>老版本的位移是提交到zookeeper中的，图就不画了，总之目录结构是：/consumers/&lt;<a href="http://group.id/" target="_blank" rel="noopener noreferrer">group.id<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a>&gt;/offsets/<topic>/<partitionId>，但是zookeeper其实并不适合进行大批量的读写操作，尤其是写操作。因此kafka提供了另一种解决方案：增加__consumeroffsets topic，将offset信息写入这个topic，摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。依然以上图中的consumer group为例，格式大概如下：</partitionId></topic></p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161226175522507-1842910668.png" alt="img"></p> <p>__consumers_offsets topic配置了compact策略，使得它总是能够保存最新的位移信息，既控制了该topic总体的日志容量，也能实现保存最新offset的目的。compact的具体原理请参见：<a href="https://kafka.apache.org/documentation/#compaction" target="_blank" rel="noopener noreferrer">Log Compaction<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></p> <p>至于每个group保存到__consumers_offsets的哪个分区，如何查看的问题请参见这篇文章：<a href="http://www.cnblogs.com/huxi2b/p/6061110.html" target="_blank" rel="noopener noreferrer">Kafka 如何读取offset topic内容 (__consumer_offsets)<svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" x="0px" y="0px" viewBox="0 0 100 100" width="15" height="15" class="icon outbound"><path fill="currentColor" d="M18.8,85.1h56l0,0c2.2,0,4-1.8,4-4v-32h-8v28h-48v-48h28v-8h-32l0,0c-2.2,0-4,1.8-4,4v56C14.8,83.3,16.6,85.1,18.8,85.1z"></path> <polygon fill="currentColor" points="45.7,48.7 51.3,54.3 77.2,28.5 77.2,37.2 85.2,37.2 85.2,14.9 62.8,14.9 62.8,22.9 71.5,22.9"></polygon></svg></a></p> <p><strong>4 Rebalance</strong></p> <p><strong>4.1 什么是rebalance？</strong></p> <p>rebalance本质上是一种协议，规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer，它订阅了一个具有100个分区的topic。正常情况下，Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。</p> <p><strong>4.2 什么时候rebalance？</strong></p> <p>这也是经常被提及的一个问题。rebalance的触发条件有三种：</p> <ul><li>组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到)</li> <li>订阅主题数发生变更——这当然是可能的，如果你使用了正则表达式的方式进行订阅，那么新建匹配正则表达式的topic就会触发rebalance</li> <li>订阅主题的分区数发生变更</li></ul> <p><strong>4.3 如何进行组内分区分配？</strong></p> <p>之前提到了group下的所有consumer都会协调在一起共同参与分配，这是如何完成的？Kafka新版本consumer默认提供了两种分配策略：range和round-robin。当然Kafka采用了可插拔式的分配策略，你可以创建自己的分配器以实现不同的分配策略。实际上，由于目前range和round-robin两种分配器都有一些弊端，Kafka社区已经提出第三种分配器来实现更加公平的分配策略，只是目前还在开发中。我们这里只需要知道consumer group默认已经帮我们把订阅topic的分区分配工作做好了就行了。</p> <p>简单举个例子，假设目前某个consumer group下有两个consumer： A和B，当第三个成员加入时，kafka会触发rebalance并根据默认的分配策略重新为A、B和C分配分区，如下图所示：</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161226175710289-1164779517.png" alt="img"></p> <p><strong>4.4 谁来执行rebalance和consumer group管理？</strong></p> <p>Kafka提供了一个角色：coordinator来执行对于consumer group的管理。坦率说kafka对于coordinator的设计与修改是一个很长的故事。最新版本的coordinator也与最初的设计有了很大的不同。这里我只想提及两次比较大的改变。</p> <p>首先是0.8版本的coordinator，那时候的coordinator是依赖zookeeper来实现对于consumer group的管理的。Coordinator监听zookeeper的/consumers/<group>/ids的子节点变化以及/brokers/topics/<topic>数据变化来判断是否需要进行rebalance。group下的每个consumer都自己决定要消费哪些分区，并把自己的决定抢先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下注册。很明显，这种方案要依赖于zookeeper的帮助，而且每个consumer是单独做决定的，没有那种“大家属于一个组，要协商做事情”的精神。</partition></topic></group></topic></group></p> <p>基于这些潜在的弊端，0.9版本的kafka改进了coordinator的设计，提出了group coordinator——每个consumer group都会被分配一个这样的coordinator用于组管理和位移管理。这个group coordinator比原来承担了更多的责任，比如组成员管理、位移提交保护机制等。当新版本consumer group的第一个consumer启动的时候，它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。显而易见，这种coordinator设计不再需要zookeeper了，性能上可以得到很大的提升。后面的所有部分我们都将讨论最新版本的coordinator设计。</p> <p><strong>4.5 如何确定coordinator？</strong></p> <p>上面简单讨论了新版coordinator的设计，那么consumer group如何确定自己的coordinator是谁呢？ 简单来说分为两步：</p> <ul><li>确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式：
<ul><li>__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)  注意：groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定，默认是50个分区。</li></ul></li> <li>该分区leader所在的broker就是被选定的coordinator</li></ul> <p><strong>4.6 Rebalance Generation</strong></p> <p>JVM GC的分代收集就是这个词(严格来说是generational)，我这里把它翻译成“届”好了，它表示了rebalance之后的一届成员，主要是用于保护consumer group，隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中。我们有时候可以看到ILLEGAL_GENERATION的错误，就是kafka在抱怨这件事情。每次group进行rebalance之后，generation号都会加1，表示group进入到了一个新的版本，如下图所示： Generation 1时group有3个成员，随后成员2退出组，coordinator触发rebalance，consumer group进入Generation 2，之后成员4加入，再次触发rebalance，group进入Generation 3.</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161226175822570-898409869.png" alt="img"></p> <p><strong>4.7 协议(protocol)</strong></p> <p>前面说过了， rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题：</p> <ul><li>Heartbeat请求：consumer需要定期给coordinator发送心跳来表明自己还活着</li> <li>LeaveGroup请求：主动告诉coordinator我要离开consumer group</li> <li>SyncGroup请求：group leader把分配方案告诉组内所有成员</li> <li>JoinGroup请求：成员请求加入组</li> <li>DescribeGroup请求：显示组的所有信息，包括成员信息，协议名称，分配方案，订阅信息等。通常该请求是给管理员使用</li></ul> <p>Coordinator在rebalance的时候主要用到了前面4种请求。
<strong>4.8 liveness</strong></p> <p>consumer如何向coordinator证明自己还活着？ 通过定时向coordinator发送Heartbeat请求。如果超过了设定的超时时间，那么coordinator就认为这个consumer已经挂了。一旦coordinator认为某个consumer挂了，那么它就会开启新一轮rebalance，并且在当前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”，告诉其他consumer：不好意思各位，你们重新申请加入组吧！</p> <p><strong>4.9 Rebalance过程</strong></p> <p>终于说到consumer group执行rebalance的具体流程了。很多用户估计对consumer内部的工作机制也很感兴趣。下面就跟大家一起讨论一下。当然我必须要明确表示，rebalance的前提是coordinator已经确定了。</p> <p>总体而言，rebalance分为2步：Join和Sync</p> <p>1 Join， 顾名思义就是加入组。这一步中，所有成员都向coordinator发送JoinGroup请求，请求入组。一旦所有成员都发送了JoinGroup请求，coordinator会从中选择一个consumer担任leader的角色，并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。</p> <p>2 Sync，这一步leader开始分配消费方案，即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配，leader会将这个方案封装进SyncGroup请求中发给coordinator，非leader也会发SyncGroup请求，只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。</p> <p>还是拿几张图来说明吧，首先是加入组的过程:</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161226175922086-1237318351.png" alt="img"></p> <p>值得注意的是， 在coordinator收集到所有成员请求前，它会把已收到请求放入一个叫purgatory(炼狱)的地方。记得国内有篇文章以此来证明kafka开发人员都是很有文艺范的，写得也是比较有趣，有兴趣可以去搜搜。
然后是分发分配方案的过程，即SyncGroup请求：</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161226180005242-1302422077.png" alt="img"></p> <p>注意！！ consumer group的分区分配方案是在客户端执行的！Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。比如这种机制下我可以实现类似于Hadoop那样的机架感知(rack-aware)分配方案，即为consumer挑选同一个机架下的分区数据，减少网络传输的开销。Kafka默认为你提供了两种分配策略：range和round-robin。由于这不是本文的重点，这里就不再详细展开了，你只需要记住你可以覆盖consumer的参数：partition.assignment.strategy来实现自己分配策略就好了。</p> <p><strong>4.10 consumer group状态机</strong></p> <p>和很多kafka组件一样，group也做了个状态机来表明组状态的流转。coordinator根据这个状态机会对consumer group做不同的处理，如下图所示(完全是根据代码注释手动画的，多见谅吧)</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161226180046945-1657832046.png" alt="img"></p> <p>简单说明下图中的各个状态：</p> <ul><li>Dead：组内已经没有任何成员的最终状态，组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response： UNKNOWN_MEMBER_ID</li> <li>Empty：组内无成员，但是位移信息还没有过期。这种状态只能响应JoinGroup请求</li> <li>PreparingRebalance：组准备开启新的rebalance，等待成员加入</li> <li>AwaitingSync：正在等待leader consumer将分配方案传给各个成员</li> <li>Stable：rebalance完成！可以开始消费了~</li></ul> <p>至于各个状态之间的流程条件以及action，这里就不具体展开了。</p> <p><strong>三、rebalance场景剖析</strong></p> <p>上面详细阐述了consumer group是如何执行rebalance的，可能依然有些云里雾里。这部分对其中的三个重要的场景做详尽的时序展开，进一步加深对于consumer group内部原理的理解。由于图比较直观，所有的描述都将以图的方式给出，不做过多的文字化描述了。</p> <p><strong>1 新成员加入组(member join)</strong></p> <p><strong><img src="https://images2017.cnblogs.com/blog/735367/201801/735367-20180122172838209-863721577.png" alt="img"></strong></p> <p><strong>2 组成员崩溃(member failure)</strong></p> <p>前面说过了，组成员崩溃和组成员主动离开是两个不同的场景。因为在崩溃时成员并不会主动地告知coordinator此事，coordinator有可能需要一个完整的session.timeout周期才能检测到这种崩溃，这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance；而崩溃则是被动地发起rebalance。okay，直接上图：</p> <p><img src="https://images2017.cnblogs.com/blog/735367/201801/735367-20180122172921209-2006292699.png" alt="img"></p> <p><strong>3 组成员主动离组（member leave group)</strong></p> <p><img src="https://images2017.cnblogs.com/blog/735367/201801/735367-20180122172958600-838820663.png" alt="img"></p> <p><strong>4 提交位移(member commit offset)</strong></p> <p><img src="https://images2017.cnblogs.com/blog/735367/201801/735367-20180122173024959-506110104.png" alt="img"></p> <p>总结一下，本文着重讨论了一下新版本的consumer group的内部设计原理，特别是consumer group与coordinator之间的交互过程，希望对各位有所帮助</p> <blockquote><p>https://www.cnblogs.com/huxi2b/p/6124937.html</p> <p>https://www.cnblogs.com/huxi2b/p/7089854.html</p></blockquote> <p>Kafka 0.9版本开始推出了Java版本的consumer，优化了coordinator的设计以及摆脱了对zookeeper的依赖。社区最近也在探讨正式用这套consumer API替换Scala版本的consumer的计划。鉴于目前这方面的资料并不是很多，本文将尝试给出一个利用KafkaConsumer编写的多线程消费者实例，希望对大家有所帮助。</p> <p>这套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer)，普通的单线程使用方法官网API已有介绍，这里不再赘述了。因此，我们直奔主题——讨论一下如何创建多线程的方式来使用KafkaConsumer。KafkaConsumer和KafkaProducer不同，后者是线程安全的，因此我们鼓励用户在多个线程中共享一个KafkaProducer实例，这样通常都要比每个线程维护一个KafkaProducer实例效率要高。但对于KafkaConsumer而言，它不是线程安全的，所以实现多线程时通常由两种实现方法：</p> <p>1 每个线程维护一个KafkaConsumer</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161202105906443-1609157006.png" alt="img"></p> <p>2 维护一个或多个KafkaConsumer，同时维护多个事件处理线程(worker thread)</p> <p><img src="https://images2015.cnblogs.com/blog/735367/201612/735367-20161202110008787-550483601.png" alt="img"></p> <p>当然，这种方法还可以有多个变种：比如每个worker线程有自己的处理队列。consumer根据某种规则或逻辑将消息放入不同的队列。不过总体思想还是相同的，故这里不做过多展开讨论了。</p> <p>下表总结了两种方法的优缺点：</p> <table><thead><tr><th></th> <th>优点</th> <th>缺点</th></tr></thead> <tbody><tr><td>方法1(每个线程维护一个KafkaConsumer)</td> <td>方便实现 速度较快，因为不需要任何线程间交互 易于维护分区内的消息顺序</td> <td>更多的TCP连接开销(每个线程都要维护若干个TCP连接) consumer数受限于topic分区数，扩展性差 频繁请求导致吞吐量下降 线程自己处理消费到的消息可能会导致超时，从而造成rebalance</td></tr> <tr><td>方法2 (单个(或多个)consumer，多个worker线程)</td> <td>可独立扩展consumer数和worker数，伸缩性好</td> <td>实现麻烦通常难于维护分区内的消息顺序处理链路变长，导致难以保证提交位移的语义正确性</td></tr></tbody></table> <p>下面我们分别实现这两种方法。需要指出的是，下面的代码都是最基本的实现，并没有考虑很多编程细节，比如如何处理错误等。</p> <p><strong>方法1</strong></p> <p><strong>ConsumerRunnable类</strong></p> <p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <div class="language- extra-class"><pre class="language-text"><code> 1 import org.apache.kafka.clients.consumer.ConsumerRecord;
 2 import org.apache.kafka.clients.consumer.ConsumerRecords;
 3 import org.apache.kafka.clients.consumer.KafkaConsumer;
 4 
 5 import java.util.Arrays;
 6 import java.util.Properties;
 7 
 8 public class ConsumerRunnable implements Runnable {
 9 
10     // 每个线程维护私有的KafkaConsumer实例
11     private final KafkaConsumer&lt;String, String&gt; consumer;
12 
13     public ConsumerRunnable(String brokerList, String groupId, String topic) {
14         Properties props = new Properties();
15         props.put(&quot;bootstrap.servers&quot;, brokerList);
16         props.put(&quot;group.id&quot;, groupId);
17         props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);        //本例使用自动提交位移
18         props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
19         props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
20         props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
21         props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
22         this.consumer = new KafkaConsumer&lt;&gt;(props);
23         consumer.subscribe(Arrays.asList(topic));   // 本例使用分区副本自动分配策略
24     }
25 
26     @Override
27     public void run() {
28         while (true) {
29             ConsumerRecords&lt;String, String&gt; records = consumer.poll(200);   // 本例使用200ms作为获取超时时间
30             for (ConsumerRecord&lt;String, String&gt; record : records) {
31                 // 这里面写处理消息的逻辑，本例中只是简单地打印消息
32                 System.out.println(Thread.currentThread().getName() + &quot; consumed &quot; + record.partition() +
33                         &quot;th message with offset: &quot; + record.offset());
34             }
35         }
36     }
37 }
</code></pre></div><p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <p><strong>ConsumerGroup类</strong></p> <p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <div class="language- extra-class"><pre class="language-text"><code> 1 package com.my.kafka.test;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 public class ConsumerGroup {
 7 
 8     private List&lt;ConsumerRunnable&gt; consumers;
 9 
10     public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
11         consumers = new ArrayList&lt;&gt;(consumerNum);
12         for (int i = 0; i &lt; consumerNum; ++i) {
13             ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
14             consumers.add(consumerThread);
15         }
16     }
17 
18     public void execute() {
19         for (ConsumerRunnable task : consumers) {
20             new Thread(task).start();
21         }
22     }
23 }
</code></pre></div><p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <p><strong>ConsumerMain类</strong></p> <p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <div class="language- extra-class"><pre class="language-text"><code> 1 public class ConsumerMain {
 2 
 3     public static void main(String[] args) {
 4         String brokerList = &quot;localhost:9092&quot;;
 5         String groupId = &quot;testGroup1&quot;;
 6         String topic = &quot;test-topic&quot;;
 7         int consumerNum = 3;
 8 
 9         ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
10         consumerGroup.execute();
11     }
12 }
</code></pre></div><p><strong>方法2</strong></p> <p><strong>Worker类</strong></p> <div class="language- extra-class"><pre class="language-text"><code> 1 import org.apache.kafka.clients.consumer.ConsumerRecord;
 2 
 3 public class Worker implements Runnable {
 4 
 5     private ConsumerRecord&lt;String, String&gt; consumerRecord;
 6 
 7     public Worker(ConsumerRecord record) {
 8         this.consumerRecord = record;
 9     }
10 
11     @Override
12     public void run() {
13         // 这里写你的消息处理逻辑，本例中只是简单地打印消息
14         System.out.println(Thread.currentThread().getName() + &quot; consumed &quot; + consumerRecord.partition()
15             + &quot;th message with offset: &quot; + consumerRecord.offset());
16     }
17 }
</code></pre></div><p><strong>ConsumerHandler类</strong></p> <div class="language- extra-class"><pre class="language-text"><code> 1 import org.apache.kafka.clients.consumer.ConsumerRecord;
 2 import org.apache.kafka.clients.consumer.ConsumerRecords;
 3 import org.apache.kafka.clients.consumer.KafkaConsumer;
 4 
 5 import java.util.Arrays;
 6 import java.util.Properties;
 7 import java.util.concurrent.ArrayBlockingQueue;
 8 import java.util.concurrent.ExecutorService;
 9 import java.util.concurrent.ThreadPoolExecutor;
10 import java.util.concurrent.TimeUnit;
11 
12 public class ConsumerHandler {
13 
14     // 本例中使用一个consumer将消息放入后端队列，你当然可以使用前一种方法中的多实例按照某张规则同时把消息放入后端队列
15     private final KafkaConsumer&lt;String, String&gt; consumer;
16     private ExecutorService executors;
17 
18     public ConsumerHandler(String brokerList, String groupId, String topic) {
19         Properties props = new Properties();
20         props.put(&quot;bootstrap.servers&quot;, brokerList);
21         props.put(&quot;group.id&quot;, groupId);
22         props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
23         props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
24         props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
25         props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
26         props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
27         consumer = new KafkaConsumer&lt;&gt;(props);
28         consumer.subscribe(Arrays.asList(topic));
29     }
30 
31     public void execute(int workerNum) {
32         executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
33                 new ArrayBlockingQueue&lt;&gt;(1000), new ThreadPoolExecutor.CallerRunsPolicy());
34 
35         while (true) {
36             ConsumerRecords&lt;String, String&gt; records = consumer.poll(200);
37             for (final ConsumerRecord record : records) {
38                 executors.submit(new Worker(record));
39             }
40         }
41     }
42 
43     public void shutdown() {
44         if (consumer != null) {
45             consumer.close();
46         }
47         if (executors != null) {
48             executors.shutdown();
49         }
50         try {
51             if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
52                 System.out.println(&quot;Timeout.... Ignore for this case&quot;);
53             }
54         } catch (InterruptedException ignored) {
55             System.out.println(&quot;Other thread interrupted this shutdown, ignore for this case.&quot;);
56             Thread.currentThread().interrupt();
57         }
58     }
59 
60 }
</code></pre></div><p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <p><strong>Main类</strong></p> <p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <div class="language- extra-class"><pre class="language-text"><code> 1 public class Main {
 2 
 3     public static void main(String[] args) {
 4         String brokerList = &quot;localhost:9092,localhost:9093,localhost:9094&quot;;
 5         String groupId = &quot;group2&quot;;
 6         String topic = &quot;test-topic&quot;;
 7         int workerNum = 5;
 8 
 9         ConsumerHandler consumers = new ConsumerHandler(brokerList, groupId, topic);
10         consumers.execute(workerNum);
11         try {
12             Thread.sleep(1000000);
13         } catch (InterruptedException ignored) {}
14         consumers.shutdown();
15     }
16 }
</code></pre></div><p>[<img src="https://common.cnblogs.com/images/copycode.gif" alt="复制代码">](javascript:void(0)😉</p> <p>总结一下，这两种方法或是模型都有各自的优缺点，在具体使用时需要根据自己实际的业务特点来选取对应的方法。就我个人而言，我比较推崇第二种方法以及背后的思想，即不要将很重的处理逻辑放入消费者的代码中，很多Kafka consumer使用者碰到的各种rebalance超时、coordinator重新选举、心跳无法维持等问题都来源于此。</p></div> <footer class="page-edit" style="display:none;"><!----> <!----></footer> <!----> <!----> <!----></main> <!----></div></div></div></div><div class="global-ui"><div class="back-to-ceiling" style="right:1rem;bottom:6rem;width:2.5rem;height:2.5rem;border-radius:.25rem;line-height:2.5rem;display:none;" data-v-db14854a data-v-db14854a><svg t="1574745035067" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="5404" class="icon" data-v-db14854a><path d="M526.60727968 10.90185116a27.675 27.675 0 0 0-29.21455937 0c-131.36607665 82.28402758-218.69155461 228.01873535-218.69155402 394.07834331a462.20625001 462.20625001 0 0 0 5.36959153 69.94390903c1.00431239 6.55289093-0.34802892 13.13561351-3.76865779 18.80351572-32.63518765 54.11355614-51.75690182 118.55860487-51.7569018 187.94566865a371.06718723 371.06718723 0 0 0 11.50484808 91.98906777c6.53300375 25.50556257 41.68394495 28.14064038 52.69160883 4.22606766 17.37162448-37.73630017 42.14135425-72.50938081 72.80769204-103.21549295 2.18761121 3.04276886 4.15646224 6.24463696 6.40373557 9.22774369a1871.4375 1871.4375 0 0 0 140.04691725 5.34970492 1866.36093723 1866.36093723 0 0 0 140.04691723-5.34970492c2.24727335-2.98310674 4.21612437-6.18497483 6.3937923-9.2178004 30.66633723 30.70611158 55.4360664 65.4791928 72.80769147 103.21549355 11.00766384 23.91457269 46.15860503 21.27949489 52.69160879-4.22606768a371.15156223 371.15156223 0 0 0 11.514792-91.99901164c0-69.36717486-19.13165746-133.82216804-51.75690182-187.92578088-3.42062944-5.66790279-4.76302748-12.26056868-3.76865837-18.80351632a462.20625001 462.20625001 0 0 0 5.36959269-69.943909c-0.00994388-166.08943902-87.32547796-311.81420293-218.6915546-394.09823051zM605.93803103 357.87693858a93.93749974 93.93749974 0 1 1-187.89594924 6.1e-7 93.93749974 93.93749974 0 0 1 187.89594924-6.1e-7z" p-id="5405" data-v-db14854a></path><path d="M429.50777625 765.63860547C429.50777625 803.39355007 466.44236686 1000.39046097 512.00932183 1000.39046097c45.56695499 0 82.4922232-197.00623328 82.5015456-234.7518555 0-37.75494459-36.9345906-68.35043303-82.4922232-68.34111062-45.57627738-0.00932239-82.52019037 30.59548842-82.51086798 68.34111062z" p-id="5406" data-v-db14854a></path></svg></div><!----></div></div>
    <script src="/assets/js/app.447d4224.js" defer></script><script src="/assets/js/3.9d76740c.js" defer></script><script src="/assets/js/1.c4fd7d2e.js" defer></script><script src="/assets/js/152.369c9362.js" defer></script>
  </body>
</html>
